Skip to content

Add layered MergePipeline for multi-source entity discovery#246

Open
bburda wants to merge 6 commits intomainfrom
feature/merge-pipeline
Open

Add layered MergePipeline for multi-source entity discovery#246
bburda wants to merge 6 commits intomainfrom
feature/merge-pipeline

Conversation

@bburda
Copy link
Collaborator

@bburda bburda commented Mar 4, 2026

Pull Request

Summary

Replace ad-hoc merging in HybridDiscoveryStrategy with a formal layered MergePipeline that orchestrates manifest, runtime, and plugin discovery sources with configurable per-field-group merge precedence (AUTHORITATIVE/ENRICHMENT/FALLBACK). Adds conflict detection, gap-fill filtering, runtime linker determinism fixes, and diagnostic reporting via /health endpoint.

Commit overview

  1. feat(discovery): add layered MergePipeline with field-group merge - MergePolicy/FieldGroup enums, MergeReport, DiscoveryLayer interface, MergePipeline core with single/multi-layer merge and conflict resolution
  2. feat(discovery): add ManifestLayer, RuntimeLayer, and PluginLayer - Concrete layer wrappers with per-field-group policies, GapFillConfig for namespace filtering
  3. feat(discovery): add post-merge RuntimeLinker and wire into HybridDiscoveryStrategy - App-to-node binding with multi-match detection, orphan warnings, deterministic namespace matching; replaces ad-hoc merge logic in HybridDiscoveryStrategy
  4. feat(discovery): add plugin integration, config, and /health reporting - IntrospectionProvider integration, YAML gap-fill config, MergeReport in /health, IntrospectionInput context passing, comprehensive docs
  5. test(plugins): add vendor extension demo and integration test - VendorExtensionPlugin demo + integration test for plugin layer discovery
  6. fix(build): enable POSITION_INDEPENDENT_CODE for MODULE targets - Fix TPOFF32/R_X86_64_PC32 linker errors for test_gateway_plugin.so

Issue


Type

  • Bug fix
  • New feature or tests
  • Breaking change
  • Documentation only

Testing

  • 80 unit tests covering merge pipeline (42) and runtime linker (38)
  • All 2410 tests passing across 7 packages (0 failures)
  • 30 integration tests passing
  • Tests cover: single/multi/three-layer merge, all merge policies (AUTH/ENRICH/FALLBACK), conflict detection, gap-fill namespace filtering (whitelist/blacklist with path-segment boundaries), App STATUS bool-or semantics, layer exception safety, orphan policies (ERROR/WARN/IGNORE/INCLUDE_AS_ORPHAN), wildcard determinism, node exclusivity, binding conflicts, plugin context passing

Checklist

  • Breaking changes are clearly described (and announced in docs / changelog if needed)
  • Tests were added or updated if needed
  • Docs were updated if behavior or public API changed

Copilot AI review requested due to automatic review settings March 4, 2026 19:26
@bburda bburda marked this pull request as draft March 4, 2026 19:27
@bburda bburda self-assigned this Mar 4, 2026
@bburda bburda added enhancement New feature or request discovery Discovery endpoints or strategies labels Mar 4, 2026
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Introduces a formal, layered discovery merge pipeline for HYBRID discovery mode, replacing the prior ad-hoc hybrid merging/linking approach and exposing pipeline diagnostics via the /health endpoint.

Changes:

  • Added DiscoveryLayer + concrete layers (ManifestLayer, RuntimeLayer, PluginLayer) and a MergePipeline with per-field-group merge policies, conflict/collision detection, and gap-fill filtering.
  • Updated HYBRID discovery to cache merged results and to run a post-merge RuntimeLinker step with deterministic namespace/topic matching and additional conflict reporting.
  • Exposed merge-pipeline diagnostics in /api/v1/health and added extensive unit tests for the pipeline and linker behavior.

Reviewed changes

Copilot reviewed 24 out of 24 changed files in this pull request and generated 5 comments.

Show a summary per file
File Description
src/ros2_medkit_gateway/test/test_runtime_linker.cpp Adds deterministic namespace/topic matching and conflict scenario tests for RuntimeLinker.
src/ros2_medkit_gateway/test/test_merge_pipeline.cpp New unit tests covering merge policies, layers, gap-fill config, plugin layer behavior, and post-merge linking.
src/ros2_medkit_gateway/src/plugins/plugin_manager.cpp Adds API to return introspection providers along with their plugin names.
src/ros2_medkit_gateway/src/http/handlers/health_handlers.cpp Adds discovery mode/strategy and merge-pipeline report summary to /health.
src/ros2_medkit_gateway/src/gateway_node.cpp Adds merge-pipeline gap-fill params; registers introspection plugins as pipeline layers in HYBRID; adjusts cache refresh flow.
src/ros2_medkit_gateway/src/discovery/merge_pipeline.cpp Implements merge pipeline execution, per-field-group merges, conflicts, ID collision logging, and post-merge app-to-node linking.
src/ros2_medkit_gateway/src/discovery/manifest/runtime_linker.cpp Adds path-segment-boundary matching, deterministic candidate selection, and binding conflict counters/warnings.
src/ros2_medkit_gateway/src/discovery/layers/runtime_layer.cpp New runtime discovery layer with gap-fill filtering and default merge policies.
src/ros2_medkit_gateway/src/discovery/layers/plugin_layer.cpp New plugin discovery layer wrapping IntrospectionProvider output.
src/ros2_medkit_gateway/src/discovery/layers/manifest_layer.cpp New manifest discovery layer wrapping ManifestManager.
src/ros2_medkit_gateway/src/discovery/hybrid_discovery.cpp Refactors HYBRID discovery to use and cache MergePipeline output.
src/ros2_medkit_gateway/src/discovery/discovery_manager.cpp Constructs pipeline in HYBRID mode, refreshes pipeline on topic-map refresh, supports adding plugin layers, and exposes merge reports.
src/ros2_medkit_gateway/include/ros2_medkit_gateway/plugins/plugin_manager.hpp Declares the new named introspection provider API.
src/ros2_medkit_gateway/include/ros2_medkit_gateway/discovery/merge_types.hpp Adds merge policy/types, merge report structure, and gap-fill config.
src/ros2_medkit_gateway/include/ros2_medkit_gateway/discovery/merge_pipeline.hpp Declares the MergePipeline interface and merge execution result.
src/ros2_medkit_gateway/include/ros2_medkit_gateway/discovery/manifest/runtime_linker.hpp Extends linking result stats/summary for conflicts and wildcard multi-match.
src/ros2_medkit_gateway/include/ros2_medkit_gateway/discovery/layers/runtime_layer.hpp Declares the runtime discovery layer wrapper and gap-fill filtering support.
src/ros2_medkit_gateway/include/ros2_medkit_gateway/discovery/layers/plugin_layer.hpp Declares the plugin discovery layer wrapper for introspection providers.
src/ros2_medkit_gateway/include/ros2_medkit_gateway/discovery/layers/manifest_layer.hpp Declares the manifest discovery layer wrapper.
src/ros2_medkit_gateway/include/ros2_medkit_gateway/discovery/hybrid_discovery.hpp Updates HYBRID strategy interface to pipeline-based caching and reporting.
src/ros2_medkit_gateway/include/ros2_medkit_gateway/discovery/discovery_manager.hpp Adds merge-pipeline config, plugin-layer API, and merge-report accessors.
src/ros2_medkit_gateway/include/ros2_medkit_gateway/discovery/discovery_layer.hpp Introduces the DiscoveryLayer interface and LayerOutput struct.
src/ros2_medkit_gateway/config/gateway_params.yaml Documents new merge-pipeline gap-fill configuration parameters.
src/ros2_medkit_gateway/CMakeLists.txt Adds new pipeline/layer sources and new test_merge_pipeline target.

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 24 out of 24 changed files in this pull request and generated 4 comments.

@bburda bburda force-pushed the feature/merge-pipeline branch from 5484fe5 to 8b47dd7 Compare March 4, 2026 20:08
@bburda bburda marked this pull request as ready for review March 4, 2026 21:15
@bburda bburda requested review from Copilot and mfaferek93 March 4, 2026 21:16
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 31 out of 31 changed files in this pull request and generated 5 comments.

Comment on lines +24 to +33
/// Path-segment-boundary namespace match: "/nav" matches "/nav" and "/nav/sub" but NOT "/navigation"
bool namespace_matches(const std::string & actual_ns, const std::string & expected_ns) {
if (actual_ns == expected_ns) {
return true;
}
if (actual_ns.size() > expected_ns.size() && actual_ns.compare(0, expected_ns.size(), expected_ns) == 0 &&
actual_ns[expected_ns.size()] == '/') {
return true;
}
return false;
Copy link

Copilot AI Mar 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change updates namespace matching to allow path-segment-boundary prefixes (e.g., expected "/nav" matches actual "/nav/sub"). The docs in docs/config/manifest-schema.rst and the manifest discovery tutorial currently describe namespace matching as “exact match”. Please update those docs to reflect the new matching semantics, otherwise users may be surprised by bindings matching deeper namespaces.

Copilot uses AI. Check for mistakes.
Comment on lines +323 to +327
// Start with highest-priority layer's entity as base
Entity merged = std::move(entries[0].entity);
size_t owner_layer_idx = entries[0].layer_idx;
report.entity_source[id] = layers_[owner_layer_idx]->name();

Copy link

Copilot AI Mar 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In merge_entities(), the merge policies for subsequent layers are always evaluated against layers_[owner_layer_idx] (the first/initial layer for this entity). If a lower-priority layer wins a field group due to a higher MergePolicy, the code does not update the “owner” for that field group, so a third (or later) layer will be compared against the wrong target policy. This can lead to incorrect precedence/overrides and inaccurate conflict reporting in multi-layer pipelines. Consider tracking the effective/winning layer per FieldGroup (or recomputing target_policy from the current winner) and updating it when SOURCE wins, so later merges use the correct target policy.

Copilot uses AI. Check for mistakes.

**Built-in layer policies:**

- **ManifestLayer** (priority 1): All field groups AUTHORITATIVE
Copy link

Copilot AI Mar 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This list states that ManifestLayer uses AUTHORITATIVE for all field groups, but the actual defaults in ManifestLayer are LIVE_DATA=ENRICHMENT and STATUS=FALLBACK (with IDENTITY/HIERARCHY/METADATA authoritative). Please align the documentation with the implemented default policies (or adjust the code if the doc is intended).

Suggested change
- **ManifestLayer** (priority 1): All field groups AUTHORITATIVE
- **ManifestLayer** (priority 1): IDENTITY/HIERARCHY/METADATA are AUTHORITATIVE, LIVE_DATA is ENRICHMENT, STATUS is FALLBACK

Copilot uses AI. Check for mistakes.

**Built-in Layers:**

- ``ManifestLayer`` - Wraps ManifestManager; all fields AUTHORITATIVE (manifest is source of truth)
Copy link

Copilot AI Mar 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This section says ManifestLayer is AUTHORITATIVE for all fields, but the implemented defaults are LIVE_DATA=ENRICHMENT and STATUS=FALLBACK. Please update this design doc to match the real merge policy defaults to avoid confusion when debugging merge results.

Suggested change
- ``ManifestLayer`` - Wraps ManifestManager; all fields AUTHORITATIVE (manifest is source of truth)
- ``ManifestLayer`` - Wraps ManifestManager; defaults to AUTHORITATIVE for all field groups except
``LIVE_DATA`` (ENRICHMENT) and ``STATUS`` (FALLBACK), with the manifest remaining the primary source of truth

Copilot uses AI. Check for mistakes.
Comment on lines +187 to +188
# Per-layer policy overrides (optional)
# Defaults: manifest=AUTH for identity/hierarchy, runtime=AUTH for live_data/status
Copy link

Copilot AI Mar 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

gateway_params.yaml documents per-layer merge policy overrides under discovery.merge_pipeline.layers, but this PR only declares/reads gap_fill parameters (no layer policy override params are wired). This is likely to confuse users since it implies a supported configuration. Either implement the policy override parameters in GatewayNode/DiscoveryManager or reword this block as a future/unsupported feature.

Suggested change
# Per-layer policy overrides (optional)
# Defaults: manifest=AUTH for identity/hierarchy, runtime=AUTH for live_data/status
# Per-layer policy overrides (FUTURE / UNSUPPORTED FEATURE)
# NOTE: The Gateway currently only reads merge_pipeline.gap_fill parameters.
# Any "layers" configuration is ignored in the current implementation
# and is reserved for a future version of the merge pipeline.
# Do NOT rely on these options for behavior in this release.
#
# Intended defaults (subject to change when implemented):
# - manifest = authoritative for identity/hierarchy
# - runtime = authoritative for live_data/status
#
# Example (NOT YET SUPPORTED):

Copilot uses AI. Check for mistakes.
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 35 out of 35 changed files in this pull request and generated 4 comments.

Comment on lines 49 to 53
std::lock_guard<std::mutex> lock(mutex_);
cached_result_ = pipeline_.execute();
if (node_) {
RCLCPP_INFO(node_->get_logger(), "Hybrid discovery refreshed: %zu entities", cached_result_.report.total_entities);
}
Copy link

Copilot AI Mar 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HybridDiscoveryStrategy::refresh() holds mutex_ while executing pipeline_.execute(). Pipeline execution can involve ROS graph introspection and plugin work and may take a non-trivial amount of time, blocking concurrent discover_* calls (and /health merge report reads). Consider executing the pipeline outside the lock and then swapping cached_result_ under the mutex to reduce contention while still keeping results consistent.

Suggested change
std::lock_guard<std::mutex> lock(mutex_);
cached_result_ = pipeline_.execute();
if (node_) {
RCLCPP_INFO(node_->get_logger(), "Hybrid discovery refreshed: %zu entities", cached_result_.report.total_entities);
}
// Run the potentially expensive pipeline execution without holding the mutex
auto new_result = pipeline_.execute();
if (node_) {
RCLCPP_INFO(
node_->get_logger(), "Hybrid discovery refreshed: %zu entities",
new_result.report.total_entities);
}
// Update the cached result under the mutex to keep readers consistent
std::lock_guard<std::mutex> lock(mutex_);
cached_result_ = std::move(new_result);

Copilot uses AI. Check for mistakes.
Comment on lines +329 to +344
// Merge with each subsequent (lower-priority) layer
for (size_t i = 1; i < entries.size(); i++) {
size_t source_layer_idx = entries[i].layer_idx;
report.enriched_count++;

for (auto fg : ALL_FIELD_GROUPS) {
auto target_policy = layers_[owner_layer_idx]->policy_for(fg);
auto source_policy = layers_[source_layer_idx]->policy_for(fg);
auto res = resolve_policies(target_policy, source_policy);

if (res.is_conflict) {
report.conflicts.push_back({id, fg, layers_[owner_layer_idx]->name(), layers_[source_layer_idx]->name()});
report.conflict_count++;
}

apply_field_group_merge(merged, entries[i].entity, fg, res);
Copy link

Copilot AI Mar 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

merge_entities() keeps owner_layer_idx fixed to the first layer that introduced the entity. If a later layer wins a field group (e.g., owner FALLBACK but source AUTHORITATIVE), subsequent merges still compare against the original owner's policy, which can let an even-lower-priority layer override an already-winning AUTHORITATIVE value and also mis-report conflicts. Consider tracking the current “owning layer” per FieldGroup (or performing pairwise merges in layer-priority order for each FieldGroup) so later merges compare against the policy of the layer that currently owns that field group’s value.

Suggested change
// Merge with each subsequent (lower-priority) layer
for (size_t i = 1; i < entries.size(); i++) {
size_t source_layer_idx = entries[i].layer_idx;
report.enriched_count++;
for (auto fg : ALL_FIELD_GROUPS) {
auto target_policy = layers_[owner_layer_idx]->policy_for(fg);
auto source_policy = layers_[source_layer_idx]->policy_for(fg);
auto res = resolve_policies(target_policy, source_policy);
if (res.is_conflict) {
report.conflicts.push_back({id, fg, layers_[owner_layer_idx]->name(), layers_[source_layer_idx]->name()});
report.conflict_count++;
}
apply_field_group_merge(merged, entries[i].entity, fg, res);
// Track current owning layer per field group as we merge
std::vector<size_t> owner_layer_idx_per_fg;
owner_layer_idx_per_fg.resize(ALL_FIELD_GROUPS.size(), owner_layer_idx);
// Merge with each subsequent (lower-priority) layer
for (size_t i = 1; i < entries.size(); i++) {
size_t source_layer_idx = entries[i].layer_idx;
report.enriched_count++;
for (size_t fg_index = 0; fg_index < ALL_FIELD_GROUPS.size(); ++fg_index) {
auto fg = ALL_FIELD_GROUPS[fg_index];
size_t current_owner_idx = owner_layer_idx_per_fg[fg_index];
auto target_policy = layers_[current_owner_idx]->policy_for(fg);
auto source_policy = layers_[source_layer_idx]->policy_for(fg);
auto res = resolve_policies(target_policy, source_policy);
if (res.is_conflict) {
report.conflicts.push_back({id, fg, layers_[current_owner_idx]->name(), layers_[source_layer_idx]->name()});
report.conflict_count++;
}
apply_field_group_merge(merged, entries[i].entity, fg, res);
// If the source has a strictly higher-priority policy for this field group and
// there is no conflict, treat the source layer as the new owner for future merges.
if (!res.is_conflict) {
int target_prio = policy_priority(target_policy);
int source_prio = policy_priority(source_policy);
if (source_prio > target_prio) {
owner_layer_idx_per_fg[fg_index] = source_layer_idx;
}
}

Copilot uses AI. Check for mistakes.
Comment on lines +370 to +385
for (size_t i = 0; i < layers_.size(); ++i) {
// Build discovery context from entities collected so far (for plugin layers)
IntrospectionInput context;
for (const auto & [idx, entities] : area_layers) {
context.areas.insert(context.areas.end(), entities.begin(), entities.end());
}
for (const auto & [idx, entities] : component_layers) {
context.components.insert(context.components.end(), entities.begin(), entities.end());
}
for (const auto & [idx, entities] : app_layers) {
context.apps.insert(context.apps.end(), entities.begin(), entities.end());
}
for (const auto & [idx, entities] : function_layers) {
context.functions.insert(context.functions.end(), entities.begin(), entities.end());
}
layers_[i]->set_discovery_context(context);
Copy link

Copilot AI Mar 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MergePipeline::execute() rebuilds IntrospectionInput from scratch on every layer by re-copying all prior layer entity vectors. With multiple plugin layers and large entity sets this becomes O(L^2) copying and can noticeably increase discovery latency. Consider maintaining an accumulating context (append just the previous layer’s output each iteration), or building the context from the already-merged-so-far result to avoid repeated full copies.

Copilot uses AI. Check for mistakes.
}
// Default: each node = 1 component (backward compatible)
return discover_node_components();
auto apps = discover_apps();
Copy link

Copilot AI Mar 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RuntimeDiscoveryStrategy::discover_components() now always derives Components via discover_synthetic_components(apps) and ignores config_.create_synthetic_components. This makes the "legacy" mode (create_synthetic_components=false => each node is a Component) impossible and changes component IDs/FQNs even when users explicitly disable synthetic components. Consider restoring the conditional: when create_synthetic_components is false, return per-node Components (with per-node fqn/namespace_path and live data) rather than synthetic groupings.

Suggested change
auto apps = discover_apps();
auto apps = discover_apps();
// Honor configuration: when synthetic components are disabled, each App becomes its own Component.
if (!config_.create_synthetic_components) {
std::vector<Component> components;
components.reserve(apps.size());
for (const auto & app : apps) {
Component component;
// Preserve per-node identity and namespace information where available
component.id = app.id;
component.namespace_path = app.namespace_path;
component.fqn = app.fqn;
component.area_id = app.area_id;
components.push_back(std::move(component));
}
return components;
}
// Default behavior: group Apps into synthetic Components based on the configured strategy.

Copilot uses AI. Check for mistakes.
@bburda bburda requested a review from Copilot March 5, 2026 16:41
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot encountered an error and was unable to review this pull request. You can try again by re-requesting a review.

bburda added 6 commits March 5, 2026 20:52
Introduce MergePolicy (AUTHORITATIVE/ENRICHMENT/FALLBACK), FieldGroup enums,
MergeReport diagnostics, DiscoveryLayer interface and LayerOutput struct.
Implement MergePipeline core with single-layer passthrough and multi-layer
field-group merge with conflict resolution.
Implement concrete DiscoveryLayer wrappers: ManifestLayer (delegates to
ManifestManager, AUTHORITATIVE for identity/hierarchy/metadata), RuntimeLayer
(delegates to RuntimeDiscoveryStrategy, AUTHORITATIVE for live_data/status),
PluginLayer (wraps IntrospectionProvider, ENRICHMENT for all fields).
Add GapFillConfig for RuntimeLayer namespace filtering in hybrid mode.
…coveryStrategy

Implement RuntimeLinker that binds manifest apps (with ros_binding) to
runtime-discovered ROS 2 nodes (with bound_fqn) after pipeline merge.
Includes multi-match detection, binding conflict reporting, orphan node
warnings, and deterministic namespace matching. Wire MergePipeline into
HybridDiscoveryStrategy replacing the ad-hoc merge logic.
Integrate IntrospectionProviders as pipeline layers with priority ordering
and batch registration. Add YAML configuration for merge pipeline gap-fill.
Expose MergeReport and LinkingResult in GET /health endpoint. Pass merged
entity context (IntrospectionInput) to plugin layers before discover().
Add comprehensive documentation for merge pipeline architecture, policies,
gap-fill options, and merge report diagnostics.
Add VendorExtensionPlugin demo that registers custom entities and routes
via the IntrospectionProvider interface. Add integration test validating
plugin layer discovery, entity merging, and context passing through the
merge pipeline.
Fix linker errors (TPOFF32, R_X86_64_PC32) when gateway_lib.a is linked
into test_gateway_plugin.so (MODULE target). Remove static thread_local
std::mt19937 in BulkDataStore (incompatible with initial-exec TLS model).
Enable POSITION_INDEPENDENT_CODE on ros2_medkit_serialization static lib.
Remove @verifies tag referencing nonexistent REQ_DISC_MERGE_FUNCTION.
@bburda bburda force-pushed the feature/merge-pipeline branch from 61874c0 to e886342 Compare March 5, 2026 20:01
@bburda bburda changed the title feat(discovery): add layered MergePipeline for multi-source entity discovery Add layered MergePipeline for multi-source entity discovery Mar 5, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

discovery Discovery endpoints or strategies enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Discovery: Implement merge pipeline for discovery hybrid approach

2 participants